library(tidyverse)
## Warning: package 'tidyverse' was built under R version 3.5.3
## -- Attaching packages --------------------------------------------------------------------------------- tidyverse 1.2.1 --
## v ggplot2 3.1.0     v purrr   0.2.5
## v tibble  1.4.2     v dplyr   0.7.8
## v tidyr   0.8.2     v stringr 1.3.1
## v readr   1.3.1     v forcats 0.3.0
## Warning: package 'readr' was built under R version 3.5.2
## Warning: package 'forcats' was built under R version 3.5.2
## -- Conflicts ------------------------------------------------------------------------------------ tidyverse_conflicts() --
## x dplyr::filter() masks stats::filter()
## x dplyr::lag()    masks stats::lag()
library(DT)
read.clean.files = function(filename){
  file = read.csv(filename, header = FALSE)
  column.names = c("Language","Randomize","Dataset","MachineID","RunID","Type","Operation","TimeTaken")
  colnames(file) = column.names
  return(file)
}

files = list.files(path = "../Results/", pattern = ".csv$", recursive = TRUE, full.names = TRUE) # List all .csv files
#files

databricks.files = files[grepl("Databricks",files)]
local.vm..files = files[grepl("Local_VM",files)]

rows.databricks = lapply(databricks.files, read.csv, header = FALSE) # Read the files into list
merged.databricks = do.call(rbind, rows.databricks) # combine the data.frame
merged.databricks$Setup = 'Databricks'

rows.local.vm = lapply(local.vm..files, read.csv, header = FALSE) # Read the files into list
merged.local.vm = do.call(rbind, rows.local.vm) # combine the data.frame
merged.local.vm$Setup = 'Local VM'

merged_data = rbind(merged.databricks,merged.local.vm)
merged_data$Setup = as.factor(merged_data$Setup)

column.names = c("Language","Randomize","Dataset","MachineID","RunID","Type","Operation","TimeTaken","Setup")
colnames(merged_data) = column.names
merged_data$Type = as.factor(gsub(pattern = "Operations", replacement = "Operation", x = merged_data$Type))
merged_data = merged_data %>% filter(RunID != 1)

# Convert columns to factors
merged_data$MachineID = as.factor(merged_data$MachineID)
merged_data$Randomize = as.factor(merged_data$Randomize)
merged_data$RunID = as.factor(merged_data$RunID)

merged_data$Dataset = sub("dataset_", "", merged_data$Dataset) 
merged_data$Dataset = sub("MB$", "", merged_data$Dataset) 
merged_data$Dataset = as.factor(merged_data$Dataset)

str(merged_data)
## 'data.frame':    4134 obs. of  9 variables:
##  $ Language : Factor w/ 2 levels "Scala","PySpark": 1 1 1 1 1 1 1 1 1 1 ...
##  $ Randomize: Factor w/ 1 level "1": 1 1 1 1 1 1 1 1 1 1 ...
##  $ Dataset  : Factor w/ 5 levels "10","100","200",..: 5 5 5 5 5 5 5 5 5 5 ...
##  $ MachineID: Factor w/ 2 levels "1","2": 1 1 1 1 1 1 1 1 1 1 ...
##  $ RunID    : Factor w/ 5 levels "2","3","4","5",..: 1 1 1 1 1 1 1 1 1 1 ...
##  $ Type     : Factor w/ 4 levels "Aggregate Operation",..: 1 1 1 2 2 1 4 4 3 4 ...
##  $ Operation: Factor w/ 37 levels " Filter"," Filter Reg Ex 1",..: 5 4 14 20 21 6 2 3 11 27 ...
##  $ TimeTaken: num  132.6 36.4 32 176.2 179.6 ...
##  $ Setup    : Factor w/ 2 levels "Databricks","Local VM": 1 1 1 1 1 1 1 1 1 1 ...
head(merged_data)
##   Language Randomize Dataset MachineID RunID                Type
## 1    Scala         1     500         1     2 Aggregate Operation
## 2    Scala         1     500         1     2 Aggregate Operation
## 3    Scala         1     500         1     2 Aggregate Operation
## 4    Scala         1     500         1     2    Column Operation
## 5    Scala         1     500         1     2    Column Operation
## 6    Scala         1     500         1     2 Aggregate Operation
##                 Operation TimeTaken      Setup
## 1      GroupBy 10 columns   132.582 Databricks
## 2        GroupBy 1 column    36.444 Databricks
## 3        Ranking by Group    32.050 Databricks
## 4   Sorting Desc 1 column   176.199 Databricks
## 5  Sorting Desc 10 column   179.554 Databricks
## 6       GroupBy 5 columns    52.993 Databricks
summary(merged_data)
##     Language    Randomize Dataset    MachineID RunID  
##  Scala  :2647   1:4134    10 :1005   1:2555    2:906  
##  PySpark:1487             100: 960   2:1579    3:889  
##                           200: 712             4:852  
##                           300: 797             5:781  
##                           500: 660             6:706  
##                                                       
##                                                       
##                   Type                           Operation   
##  Aggregate Operation: 558    Merge 2 columns into 1   : 142  
##  Column Operation   :2027    Merge 5 columns into 1   : 142  
##  Mixed Operation    : 423    Pivot 1 Rows and 1 Column: 142  
##  Row Operation      :1126    Shift (Lag)              : 142  
##                              Split 1 Column into 10   : 142  
##                              Filter                   : 141  
##                             (Other)                   :3283  
##    TimeTaken              Setup     
##  Min.   :  0.237   Databricks:3092  
##  1st Qu.:  3.354   Local VM  :1042  
##  Median : 10.271                    
##  Mean   : 24.995                    
##  3rd Qu.: 26.175                    
##  Max.   :275.332                    
## 
size_10MB =  11.4789848327637 # file.size("../../Data/Databricks/machine2/dataset_10MB.csv")/(1024*1024)
size_100MB = 115.640992164612 # file.size("../../Data/Databricks/machine2/dataset_100MB.csv")/(1024*1024) 
size_200MB = 229.8573  
size_300MB = 343.2709
size_500MB = 576.678165435791 # file.size("../../Data/Databricks/machine2/dataset_500MB.csv")/(1024*1024) 

print(paste("Actual Size of 10MB file (in MB)",size_10MB))
## [1] "Actual Size of 10MB file (in MB) 11.4789848327637"
print(paste("Actual Size of 100MB file (in MB)",size_100MB))
## [1] "Actual Size of 100MB file (in MB) 115.640992164612"
print(paste("Actual Size of 200MB file (in MB)",size_200MB))
## [1] "Actual Size of 200MB file (in MB) 229.8573"
print(paste("Actual Size of 300MB file (in MB)",size_300MB))
## [1] "Actual Size of 300MB file (in MB) 343.2709"
print(paste("Actual Size of 500MB file (in MB)",size_500MB))
## [1] "Actual Size of 500MB file (in MB) 576.678165435791"
size_info = data.frame(Dataset = c("10","100","200","300","500")
                       ,Size = c(size_10MB,size_100MB,size_200MB,size_300MB,size_500MB))
str(size_info)
## 'data.frame':    5 obs. of  2 variables:
##  $ Dataset: Factor w/ 5 levels "10","100","200",..: 1 2 3 4 5
##  $ Size   : num  11.5 115.6 229.9 343.3 576.7
merged_data = merged_data %>%
  merge(size_info) %>%
  mutate(Throughput = Size/TimeTaken)

Common Functions

summarize_results = function(grouped_data){
  rv = grouped_data %>%
    summarise(n = n()
            ,Mean_Time = round(mean(TimeTaken),2)
            ,Std_Dev_Time= round(sd(TimeTaken),2)
            ,Coeff_Var_Time = round(Mean_Time/Std_Dev_Time,2)
            ,Mean_Throughput = round(mean(Throughput),2)
            ,Std_Dev_Throughput= round(sd(Throughput),2)
            ,Coeff_Var_Throughput = round(Mean_Throughput/Std_Dev_Throughput,2)
            )
  return(rv)  
}


plot_hist = function(grouped_data, by_var){
  indices = grouped_data %>%
    dplyr::group_indices() %>%
    as.factor()

  grouped_data$Index = as.factor(indices)

  print(ggplot(grouped_data, aes_string(x = "Index", y = "TimeTaken", fill=by_var)) + 
    geom_boxplot() + 
    facet_wrap(~Index, scales = 'free',ncol=4))
  
  return(grouped_data)
}

Databricks vs. Local VM

Table

group = merged_data %>% 
  group_by(Type, Operation, Language, MachineID, Dataset, Setup)
  
result = summarize_results(group)
DT::datatable(result)

Plots

group = merged_data %>% 
  group_by(Type, Operation, Language, MachineID, Dataset)

group2 = plot_hist(grouped_data = group, by_var = "Setup")

#Evaluate outliers
DT::datatable(group2$data[group2$data$Index == 39,])
## Warning: Unknown or uninitialised column: 'data'.

## Warning: Unknown or uninitialised column: 'data'.

Comparison between dataset sizes

group = merged_data %>% 
  group_by(Type, Operation, Language, MachineID, Setup, Dataset)
  
result = summarize_results(group)
DT::datatable(result)

Observations

  • Throughout (MB/Time) does not remian constant.
  • For column operations, it increased from 10MB to 100MB, but decreases from 100MB to 500MB implying that there is a sweet spot.

Plots

group = merged_data %>% 
  group_by(Type, Operation, Language, MachineID, Setup)

group2 = plot_hist(grouped_data = group, by_var = "Dataset")

#Evaluate outliers
#DT::datatable(group2$data[group2$data$Index == 39,])

Comparison between Scala and PySpark

group = merged_data %>% 
  group_by(Type, Operation, Dataset, MachineID, Setup, Language)
  
result = summarize_results(group)
DT::datatable(result)

Plots

group = merged_data %>% 
  group_by(Type, Operation, Dataset, MachineID, Setup)

group2 = plot_hist(grouped_data = group, by_var = "Language")

Observations

  • In general Scala seems to be faster than PySpark which is good and consistent with theory
#Evaluate outliers
#DT::datatable(group2$data[group2$data$Index == 39,])

Comparison between Machine 1 and Machine 2

group = merged_data %>% 
  group_by(Type, Operation, Language, Dataset, Setup, MachineID)
  
result = summarize_results(group)
DT::datatable(result)

Plots

group = merged_data %>% 
  group_by(Type, Operation, Language, Dataset, Setup)

group2 = plot_hist(grouped_data = group, by_var = "MachineID")

#Evaluate outliers
#DT::datatable(group2$data[group2$data$Index == 39,])